package pkg

import (
	"context"
	"errors"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"sync"
	"time"
)

type AmqpChannel struct {
	amqpConnection *AmqpConnection
	realChannel    *amqp.Channel
	connected      bool
	closed         bool
	// 消费者
	consumers sync.Map
}

func (c *AmqpChannel) connect() {
	// 连接未关闭时执行连接
	for !c.amqpConnection.closed && !c.closed && !c.connected {
		channel := c.amqpConnection.getRealChannel()
		if nil != channel {
			// 成功获取channel
			c.realChannel = channel
			c.connected = true
		} else {
			// 返回nil,外层连接已经关闭,下一次循环会结束循环
			time.Sleep(time.Millisecond * 500)
		}
	}
}

func (c *AmqpChannel) monitorConnect() {
	for !c.amqpConnection.closed && !c.closed {
		closeChan := make(chan *amqp.Error)
		c.realChannel.NotifyClose(closeChan)
		err := <-closeChan
		errMsg := ""
		if nil != err {
			errMsg = err.Error()
		}
		log.Printf("%s amqp channel 断开 %s\n", c.amqpConnection.logID, errMsg)
		c.connected = false
		// 连接断开,执行重连
		if !c.amqpConnection.closed && !c.closed {
			log.Printf("%s amqp channel 断开,执行重连\n", c.amqpConnection.logID)
			c.connect()
		}
	}
	c.closed = true
	log.Printf("%s amqp channel monitor 停止\n", c.amqpConnection.logID)
}

func (c *AmqpChannel) Disconnect() {
	c.closed = true
	if nil != c.realChannel {
		_ = c.realChannel.Close()
	}
	c.connected = false
}

func NewAmqpChannel(amqpConnection *AmqpConnection) *AmqpChannel {
	channel := AmqpChannel{
		amqpConnection: amqpConnection,
		connected:      false,
		closed:         false,
	}
	channel.connect()
	go channel.monitorConnect()
	return &channel
}

func (c *AmqpChannel) AddConsumer(queueName string, consumerName string, prefetch int) chan amqp.Delivery {
	deliveryChan := make(chan amqp.Delivery)
	if !c.connected {
		close(deliveryChan)
		return deliveryChan
	}
	c.consumers.Store(consumerName, "")
	go func(c *AmqpChannel, queueName string, deliveryChan chan amqp.Delivery, consumerName string) {
		for {
			_, ok := c.consumers.Load(consumerName)
			if !ok || c.closed || c.amqpConnection.closed {
				break
			} else {
				if !c.connected {
					time.Sleep(time.Millisecond * 500)
					continue
				}
				err := c.realChannel.Qos(prefetch, 0, false)
				if nil != err {
					time.Sleep(time.Millisecond * 500)
					log.Printf("%s amqp consumer [%s] queueName [%s] qos 异常 %s\n", c.amqpConnection.logID, consumerName, queueName, err.Error())
					continue
				}
				deliveryChanInner, err := c.realChannel.Consume(queueName, consumerName, false, false, false, false, nil)
				if nil != err {
					log.Printf("%s amqp consumer [%s] queueName [%s] consume 异常 %s\n", c.amqpConnection.logID, consumerName, queueName, err.Error())
					time.Sleep(time.Millisecond * 500)
					continue
				} else {
					log.Printf("%s consumer [%s] queueName [%s] consume 成功\n", c.amqpConnection.logID, consumerName, queueName)
					for {
						msg, ok := <-deliveryChanInner
						if !ok {
							_, ok := c.consumers.Load(consumerName)
							if !c.closed && !c.amqpConnection.closed && ok {
								// 监听断开,执行重连
								log.Printf("%s consumer [%s] queueName [%s] consume 断开,执行重连\n", c.amqpConnection.logID, consumerName, queueName)
								time.Sleep(time.Millisecond * 500)
							}
							break
						} else {
							deliveryChan <- msg
						}
					}
				}
			}
		}
		close(deliveryChan)
		log.Printf("%s consumer [%s] queueName [%s] 停止\n", c.amqpConnection.logID, consumerName, queueName)
	}(c, queueName, deliveryChan, consumerName)
	return deliveryChan
}

func (c *AmqpChannel) RemoveConsumer(consumerName string) {
	c.consumers.Delete(consumerName)
	if nil != c.realChannel {
		_ = c.realChannel.Cancel(consumerName, false)
	}
}

// SendToExchangeWithType 发送数据至指定交换机
func (c *AmqpChannel) SendToExchangeWithType(exchangeName string, routingKey string, body string, contentType string) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	return c.realChannel.PublishWithContext(context.Background(), exchangeName, routingKey, false, false, amqp.Publishing{
		ContentType: contentType,
		Body:        []byte(body),
	})
}

// SendToExchange 发送数据至指定交换机
func (c *AmqpChannel) SendToExchange(exchangeName string, routingKey string, body string) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	return c.realChannel.PublishWithContext(context.Background(), exchangeName, routingKey, false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
}

// SendToQueueWithType 发送数据至指定队列
func (c *AmqpChannel) SendToQueueWithType(queueName string, body string, contentType string) error {
	return c.SendToExchangeWithType("", queueName, body, contentType)
}

// SendToQueue 发送数据至指定队列
func (c *AmqpChannel) SendToQueue(queueName string, body string) error {
	return c.SendToExchange("", queueName, body)
}

// DeclareQueue 创建队列
func (c *AmqpChannel) DeclareQueue(queueName string, durable bool, autoDelete bool) (string, error) {
	if nil == c.realChannel {
		return "", errors.New("not connect")
	}
	q, err := c.realChannel.QueueDeclare(queueName, durable, autoDelete, false, false, nil)
	if nil == err {
		return q.Name, nil
	}
	return "", err
}

// DeleteQueue 删除队列
func (c *AmqpChannel) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	_, err := c.realChannel.QueueDelete(queueName, ifUnused, ifEmpty, false)
	return err
}

// DeclareExchange 创建Exchange
// kind direct fanout topic headers
func (c *AmqpChannel) DeclareExchange(exchangeName, kind string, durable, autoDelete bool) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	return c.realChannel.ExchangeDeclare(exchangeName, kind, durable, autoDelete, false, false, nil)
}

// DeleteExchange 删除Exchange
func (c *AmqpChannel) DeleteExchange(exchangeName string, ifUnused bool) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	return c.realChannel.ExchangeDelete(exchangeName, ifUnused, false)
}

// Bind 绑定queue至exchange
func (c *AmqpChannel) Bind(queueName string, routingKey string, exchangeName string) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	return c.realChannel.QueueBind(queueName, routingKey, exchangeName, false, nil)
}

// Unbind 解绑queue与exchange
func (c *AmqpChannel) Unbind(queueName string, routingKey string, exchangeName string) error {
	if nil == c.realChannel {
		return errors.New("not connect")
	}
	return c.realChannel.QueueUnbind(queueName, routingKey, exchangeName, nil)
}
